// Copyright 2014 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "media/cast/sender/frame_sender.h"

#include <algorithm>
#include <limits>
#include <utility>
#include <vector>

#include "base/macros.h"
#include "base/memory/ptr_util.h"
#include "base/numerics/safe_conversions.h"
#include "base/trace_event/trace_event.h"
#include "media/cast/constants.h"
#include "media/cast/sender/sender_encoded_frame.h"

namespace media {
namespace cast {
    namespace {

        const int kMinSchedulingDelayMs = 1;
        const int kNumAggressiveReportsSentAtStart = 100;

        // The additional number of frames that can be in-flight when input exceeds the
        // maximum frame rate.
        const int kMaxFrameBurst = 5;

    } // namespace

// Convenience macro used in logging statements throughout this file.
#define SENDER_SSRC (is_audio_ ? "AUDIO[" : "VIDEO[") << ssrc_ << "] "

    FrameSender::RtcpClient::RtcpClient(base::WeakPtr<FrameSender> frame_sender)
        : frame_sender_(frame_sender)
    {
    }

    FrameSender::RtcpClient::~RtcpClient() { }

    void FrameSender::RtcpClient::OnReceivedCastMessage(
        const RtcpCastMessage& cast_message)
    {
        if (frame_sender_)
            frame_sender_->OnReceivedCastFeedback(cast_message);
    }

    void FrameSender::RtcpClient::OnReceivedRtt(base::TimeDelta round_trip_time)
    {
        if (frame_sender_)
            frame_sender_->OnMeasuredRoundTripTime(round_trip_time);
    }

    void FrameSender::RtcpClient::OnReceivedPli()
    {
        if (frame_sender_)
            frame_sender_->OnReceivedPli();
    }

    FrameSender::FrameSender(scoped_refptr<CastEnvironment> cast_environment,
        CastTransport* const transport_sender,
        const FrameSenderConfig& config,
        CongestionControl* congestion_control)
        : cast_environment_(cast_environment)
        , transport_sender_(transport_sender)
        , ssrc_(config.sender_ssrc)
        , min_playout_delay_(config.min_playout_delay.is_zero()
                  ? config.max_playout_delay
                  : config.min_playout_delay)
        , max_playout_delay_(config.max_playout_delay)
        , animated_playout_delay_(config.animated_playout_delay.is_zero()
                  ? config.max_playout_delay
                  : config.animated_playout_delay)
        , send_target_playout_delay_(false)
        , max_frame_rate_(config.max_frame_rate)
        , num_aggressive_rtcp_reports_sent_(0)
        , duplicate_ack_counter_(0)
        , congestion_control_(congestion_control)
        , picture_lost_at_receiver_(false)
        , rtp_timebase_(config.rtp_timebase)
        , is_audio_(config.rtp_payload_type <= RtpPayloadType::AUDIO_LAST)
        , weak_factory_(this)
    {
        DCHECK(transport_sender_);
        DCHECK_GT(rtp_timebase_, 0);
        DCHECK(congestion_control_);
        // We assume animated content to begin with since that is the common use
        // case today.
        VLOG(1) << SENDER_SSRC << "min latency "
                << min_playout_delay_.InMilliseconds() << "max latency "
                << max_playout_delay_.InMilliseconds() << "animated latency "
                << animated_playout_delay_.InMilliseconds();
        SetTargetPlayoutDelay(animated_playout_delay_);

        CastTransportRtpConfig transport_config;
        transport_config.ssrc = config.sender_ssrc;
        transport_config.feedback_ssrc = config.receiver_ssrc;
        transport_config.rtp_payload_type = config.rtp_payload_type;
        transport_config.aes_key = config.aes_key;
        transport_config.aes_iv_mask = config.aes_iv_mask;

        transport_sender->InitializeStream(
            transport_config,
            base::MakeUnique<FrameSender::RtcpClient>(weak_factory_.GetWeakPtr()));
    }

    FrameSender::~FrameSender()
    {
    }

    void FrameSender::ScheduleNextRtcpReport()
    {
        DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN));

        cast_environment_->PostDelayedTask(
            CastEnvironment::MAIN, FROM_HERE,
            base::Bind(&FrameSender::SendRtcpReport, weak_factory_.GetWeakPtr(),
                true),
            base::TimeDelta::FromMilliseconds(kRtcpReportIntervalMs));
    }

    void FrameSender::SendRtcpReport(bool schedule_future_reports)
    {
        DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN));

        // Sanity-check: We should have sent at least the first frame by this point.
        DCHECK(!last_send_time_.is_null());

        // Create lip-sync info for the sender report.  The last sent frame's
        // reference time and RTP timestamp are used to estimate an RTP timestamp in
        // terms of "now."  Note that |now| is never likely to be precise to an exact
        // frame boundary; and so the computation here will result in a
        // |now_as_rtp_timestamp| value that is rarely equal to any one emitted by the
        // encoder.
        const base::TimeTicks now = cast_environment_->Clock()->NowTicks();
        const base::TimeDelta time_delta = now - GetRecordedReferenceTime(last_sent_frame_id_);
        const RtpTimeDelta rtp_delta = RtpTimeDelta::FromTimeDelta(time_delta, rtp_timebase_);
        const RtpTimeTicks now_as_rtp_timestamp = GetRecordedRtpTimestamp(last_sent_frame_id_) + rtp_delta;
        transport_sender_->SendSenderReport(ssrc_, now, now_as_rtp_timestamp);

        if (schedule_future_reports)
            ScheduleNextRtcpReport();
    }

    void FrameSender::OnMeasuredRoundTripTime(base::TimeDelta rtt)
    {
        DCHECK(rtt > base::TimeDelta());
        current_round_trip_time_ = rtt;
    }

    void FrameSender::SetTargetPlayoutDelay(
        base::TimeDelta new_target_playout_delay)
    {
        if (send_target_playout_delay_ && target_playout_delay_ == new_target_playout_delay) {
            return;
        }
        new_target_playout_delay = std::max(new_target_playout_delay,
            min_playout_delay_);
        new_target_playout_delay = std::min(new_target_playout_delay,
            max_playout_delay_);
        VLOG(2) << SENDER_SSRC << "Target playout delay changing from "
                << target_playout_delay_.InMilliseconds() << " ms to "
                << new_target_playout_delay.InMilliseconds() << " ms.";
        target_playout_delay_ = new_target_playout_delay;
        send_target_playout_delay_ = true;
        congestion_control_->UpdateTargetPlayoutDelay(target_playout_delay_);
    }

    void FrameSender::ResendCheck()
    {
        DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN));
        DCHECK(!last_send_time_.is_null());
        const base::TimeDelta time_since_last_send = cast_environment_->Clock()->NowTicks() - last_send_time_;
        if (time_since_last_send > target_playout_delay_) {
            if (latest_acked_frame_id_ == last_sent_frame_id_) {
                // Last frame acked, no point in doing anything
            } else {
                VLOG(1) << SENDER_SSRC << "ACK timeout; last acked frame: "
                        << latest_acked_frame_id_;
                ResendForKickstart();
            }
        }
        ScheduleNextResendCheck();
    }

    void FrameSender::ScheduleNextResendCheck()
    {
        DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN));
        DCHECK(!last_send_time_.is_null());
        base::TimeDelta time_to_next = last_send_time_ - cast_environment_->Clock()->NowTicks() + target_playout_delay_;
        time_to_next = std::max(
            time_to_next, base::TimeDelta::FromMilliseconds(kMinSchedulingDelayMs));
        cast_environment_->PostDelayedTask(
            CastEnvironment::MAIN,
            FROM_HERE,
            base::Bind(&FrameSender::ResendCheck, weak_factory_.GetWeakPtr()),
            time_to_next);
    }

    void FrameSender::ResendForKickstart()
    {
        DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN));
        DCHECK(!last_send_time_.is_null());
        VLOG(1) << SENDER_SSRC << "Resending last packet of frame "
                << last_sent_frame_id_ << " to kick-start.";
        last_send_time_ = cast_environment_->Clock()->NowTicks();
        transport_sender_->ResendFrameForKickstart(ssrc_, last_sent_frame_id_);
    }

    void FrameSender::RecordLatestFrameTimestamps(FrameId frame_id,
        base::TimeTicks reference_time,
        RtpTimeTicks rtp_timestamp)
    {
        DCHECK(!reference_time.is_null());
        frame_reference_times_[frame_id.lower_8_bits()] = reference_time;
        frame_rtp_timestamps_[frame_id.lower_8_bits()] = rtp_timestamp;
    }

    base::TimeTicks FrameSender::GetRecordedReferenceTime(FrameId frame_id) const
    {
        return frame_reference_times_[frame_id.lower_8_bits()];
    }

    RtpTimeTicks FrameSender::GetRecordedRtpTimestamp(FrameId frame_id) const
    {
        return frame_rtp_timestamps_[frame_id.lower_8_bits()];
    }

    int FrameSender::GetUnacknowledgedFrameCount() const
    {
        if (last_send_time_.is_null())
            return 0;
        const int count = last_sent_frame_id_ - latest_acked_frame_id_;
        DCHECK_GE(count, 0);
        return count;
    }

    base::TimeDelta FrameSender::GetAllowedInFlightMediaDuration() const
    {
        // The total amount allowed in-flight media should equal the amount that fits
        // within the entire playout delay window, plus the amount of time it takes to
        // receive an ACK from the receiver.
        // TODO(miu): Research is needed, but there is likely a better formula.
        return target_playout_delay_ + (current_round_trip_time_ / 2);
    }

    void FrameSender::SendEncodedFrame(
        int requested_bitrate_before_encode,
        std::unique_ptr<SenderEncodedFrame> encoded_frame)
    {
        DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN));

        VLOG(2) << SENDER_SSRC << "About to send another frame: last_sent="
                << last_sent_frame_id_ << ", latest_acked=" << latest_acked_frame_id_;

        const FrameId frame_id = encoded_frame->frame_id;
        const bool is_first_frame_to_be_sent = last_send_time_.is_null();

        if (picture_lost_at_receiver_ && (encoded_frame->dependency == EncodedFrame::KEY)) {
            picture_lost_at_receiver_ = false;
            DCHECK(frame_id > latest_acked_frame_id_);
            // Cancel sending remaining frames.
            std::vector<FrameId> cancel_sending_frames;
            for (FrameId id = latest_acked_frame_id_ + 1; id < frame_id; ++id) {
                cancel_sending_frames.push_back(id);
            }
            transport_sender_->CancelSendingFrames(ssrc_, cancel_sending_frames);
        }

        last_send_time_ = cast_environment_->Clock()->NowTicks();
        last_sent_frame_id_ = frame_id;
        // If this is the first frame about to be sent, fake the value of
        // |latest_acked_frame_id_| to indicate the receiver starts out all caught up.
        // Also, schedule the periodic frame re-send checks.
        if (is_first_frame_to_be_sent) {
            latest_acked_frame_id_ = frame_id - 1;
            ScheduleNextResendCheck();
        }

        VLOG_IF(1, !is_audio_ && encoded_frame->dependency == EncodedFrame::KEY)
            << SENDER_SSRC << "Sending encoded key frame, id=" << frame_id;

        std::unique_ptr<FrameEvent> encode_event(new FrameEvent());
        encode_event->timestamp = encoded_frame->encode_completion_time;
        encode_event->type = FRAME_ENCODED;
        encode_event->media_type = is_audio_ ? AUDIO_EVENT : VIDEO_EVENT;
        encode_event->rtp_timestamp = encoded_frame->rtp_timestamp;
        encode_event->frame_id = frame_id;
        encode_event->size = base::checked_cast<uint32_t>(encoded_frame->data.size());
        encode_event->key_frame = encoded_frame->dependency == EncodedFrame::KEY;
        encode_event->target_bitrate = requested_bitrate_before_encode;
        encode_event->encoder_cpu_utilization = encoded_frame->encoder_utilization;
        encode_event->idealized_bitrate_utilization = encoded_frame->lossy_utilization;
        cast_environment_->logger()->DispatchFrameEvent(std::move(encode_event));

        RecordLatestFrameTimestamps(frame_id,
            encoded_frame->reference_time,
            encoded_frame->rtp_timestamp);

        if (!is_audio_) {
            // Used by chrome/browser/extension/api/cast_streaming/performance_test.cc
            TRACE_EVENT_INSTANT1(
                "cast_perf_test", "VideoFrameEncoded",
                TRACE_EVENT_SCOPE_THREAD,
                "rtp_timestamp", encoded_frame->rtp_timestamp.lower_32_bits());
        }

        // At the start of the session, it's important to send reports before each
        // frame so that the receiver can properly compute playout times.  The reason
        // more than one report is sent is because transmission is not guaranteed,
        // only best effort, so send enough that one should almost certainly get
        // through.
        if (num_aggressive_rtcp_reports_sent_ < kNumAggressiveReportsSentAtStart) {
            // SendRtcpReport() will schedule future reports to be made if this is the
            // last "aggressive report."
            ++num_aggressive_rtcp_reports_sent_;
            const bool is_last_aggressive_report = (num_aggressive_rtcp_reports_sent_ == kNumAggressiveReportsSentAtStart);
            VLOG_IF(1, is_last_aggressive_report)
                << SENDER_SSRC << "Sending last aggressive report.";
            SendRtcpReport(is_last_aggressive_report);
        }

        congestion_control_->SendFrameToTransport(
            frame_id, encoded_frame->data.size() * 8, last_send_time_);

        if (send_target_playout_delay_) {
            encoded_frame->new_playout_delay_ms = target_playout_delay_.InMilliseconds();
        }

        TRACE_EVENT_ASYNC_BEGIN1("cast.stream",
            is_audio_ ? "Audio Transport" : "Video Transport",
            frame_id.lower_32_bits(), "rtp_timestamp",
            encoded_frame->rtp_timestamp.lower_32_bits());
        transport_sender_->InsertFrame(ssrc_, *encoded_frame);
    }

    void FrameSender::OnReceivedCastFeedback(const RtcpCastMessage& cast_feedback)
    {
        DCHECK(cast_environment_->CurrentlyOn(CastEnvironment::MAIN));

        const bool have_valid_rtt = current_round_trip_time_ > base::TimeDelta();
        if (have_valid_rtt) {
            congestion_control_->UpdateRtt(current_round_trip_time_);

            // Having the RTT value implies the receiver sent back a receiver report
            // based on it having received a report from here.  Therefore, ensure this
            // sender stops aggressively sending reports.
            if (num_aggressive_rtcp_reports_sent_ < kNumAggressiveReportsSentAtStart) {
                VLOG(1) << SENDER_SSRC
                        << "No longer a need to send reports aggressively (sent "
                        << num_aggressive_rtcp_reports_sent_ << ").";
                num_aggressive_rtcp_reports_sent_ = kNumAggressiveReportsSentAtStart;
                ScheduleNextRtcpReport();
            }
        }

        if (last_send_time_.is_null())
            return; // Cannot get an ACK without having first sent a frame.

        if (cast_feedback.missing_frames_and_packets.empty() && cast_feedback.received_later_frames.empty()) {
            if (latest_acked_frame_id_ == cast_feedback.ack_frame_id) {
                VLOG(1) << SENDER_SSRC << "Received duplicate ACK for frame "
                        << latest_acked_frame_id_;
                TRACE_EVENT_INSTANT2(
                    "cast.stream", "Duplicate ACK", TRACE_EVENT_SCOPE_THREAD,
                    "ack_frame_id", cast_feedback.ack_frame_id.lower_32_bits(),
                    "last_sent_frame_id", last_sent_frame_id_.lower_32_bits());
            }
            // We only count duplicate ACKs when we have sent newer frames.
            if (latest_acked_frame_id_ == cast_feedback.ack_frame_id && latest_acked_frame_id_ != last_sent_frame_id_) {
                duplicate_ack_counter_++;
            } else {
                duplicate_ack_counter_ = 0;
            }
            // TODO(miu): The values "2" and "3" should be derived from configuration.
            if (duplicate_ack_counter_ >= 2 && duplicate_ack_counter_ % 3 == 2) {
                ResendForKickstart();
            }
        } else {
            // Only count duplicated ACKs if there is no NACK request in between.
            // This is to avoid aggresive resend.
            duplicate_ack_counter_ = 0;
        }

        base::TimeTicks now = cast_environment_->Clock()->NowTicks();
        congestion_control_->AckFrame(cast_feedback.ack_frame_id, now);
        if (!cast_feedback.received_later_frames.empty()) {
            // Ack the received frames.
            congestion_control_->AckLaterFrames(cast_feedback.received_later_frames,
                now);
        }

        std::unique_ptr<FrameEvent> ack_event(new FrameEvent());
        ack_event->timestamp = now;
        ack_event->type = FRAME_ACK_RECEIVED;
        ack_event->media_type = is_audio_ ? AUDIO_EVENT : VIDEO_EVENT;
        ack_event->rtp_timestamp = GetRecordedRtpTimestamp(cast_feedback.ack_frame_id);
        ack_event->frame_id = cast_feedback.ack_frame_id;
        cast_environment_->logger()->DispatchFrameEvent(std::move(ack_event));

        const bool is_acked_out_of_order = cast_feedback.ack_frame_id < latest_acked_frame_id_;
        VLOG(2) << SENDER_SSRC
                << "Received ACK" << (is_acked_out_of_order ? " out-of-order" : "")
                << " for frame " << cast_feedback.ack_frame_id;
        if (is_acked_out_of_order) {
            TRACE_EVENT_INSTANT2(
                "cast.stream", "ACK out of order", TRACE_EVENT_SCOPE_THREAD,
                "ack_frame_id", cast_feedback.ack_frame_id.lower_32_bits(),
                "latest_acked_frame_id", latest_acked_frame_id_.lower_32_bits());
        } else if (latest_acked_frame_id_ < cast_feedback.ack_frame_id) {
            // Cancel resends of acked frames.
            std::vector<FrameId> frames_to_cancel;
            frames_to_cancel.reserve(cast_feedback.ack_frame_id - latest_acked_frame_id_);
            do {
                ++latest_acked_frame_id_;
                frames_to_cancel.push_back(latest_acked_frame_id_);
                // This is a good place to match the trace for frame ids
                // since this ensures we not only track frame ids that are
                // implicitly ACKed, but also handles duplicate ACKs
                TRACE_EVENT_ASYNC_END1(
                    "cast.stream", is_audio_ ? "Audio Transport" : "Video Transport",
                    latest_acked_frame_id_.lower_32_bits(), "RTT_usecs",
                    current_round_trip_time_.InMicroseconds());
            } while (latest_acked_frame_id_ < cast_feedback.ack_frame_id);
            transport_sender_->CancelSendingFrames(ssrc_, frames_to_cancel);
        }
    }

    void FrameSender::OnReceivedPli()
    {
        picture_lost_at_receiver_ = true;
    }

    bool FrameSender::ShouldDropNextFrame(base::TimeDelta frame_duration) const
    {
        // Check that accepting the next frame won't cause more frames to become
        // in-flight than the system's design limit.
        const int count_frames_in_flight = GetUnacknowledgedFrameCount() + GetNumberOfFramesInEncoder();
        if (count_frames_in_flight >= kMaxUnackedFrames) {
            VLOG(1) << SENDER_SSRC << "Dropping: Too many frames would be in-flight.";
            return true;
        }

        // Check that accepting the next frame won't exceed the configured maximum
        // frame rate, allowing for short-term bursts.
        base::TimeDelta duration_in_flight = GetInFlightMediaDuration();
        const double max_frames_in_flight = max_frame_rate_ * duration_in_flight.InSecondsF();
        if (count_frames_in_flight >= max_frames_in_flight + kMaxFrameBurst) {
            VLOG(1) << SENDER_SSRC << "Dropping: Burst threshold would be exceeded.";
            return true;
        }

        // Check that accepting the next frame won't exceed the allowed in-flight
        // media duration.
        const base::TimeDelta duration_would_be_in_flight = duration_in_flight + frame_duration;
        const base::TimeDelta allowed_in_flight = GetAllowedInFlightMediaDuration();
        if (VLOG_IS_ON(1)) {
            const int64_t percent = allowed_in_flight > base::TimeDelta()
                ? 100 * duration_would_be_in_flight / allowed_in_flight
                : std::numeric_limits<int64_t>::max();
            VLOG_IF(1, percent > 50)
                << SENDER_SSRC
                << duration_in_flight.InMicroseconds() << " usec in-flight + "
                << frame_duration.InMicroseconds() << " usec for next frame --> "
                << percent << "% of allowed in-flight.";
        }
        if (duration_would_be_in_flight > allowed_in_flight) {
            VLOG(1) << SENDER_SSRC << "Dropping: In-flight duration would be too high.";
            return true;
        }

        // Next frame is accepted.
        return false;
    }

} // namespace cast
} // namespace media
